Skip to content

Commit

Permalink
Add resource defs to source asset, handle transitive dependencies (#8223
Browse files Browse the repository at this point in the history
)

* Add resource defs to source asset, handle transitive dependencies

* Add another layer of transitivity to dep test

* Refactor resource transitivity dependency code

* Test fixes

* Move to using fs_io_manager as default for with_resources

* Make with_resources generic, fix lint errors, fix asset group

* Add group name

* Use default_job_io_manager in build_assets_job

* Fix test failures, subset case
  • Loading branch information
dpeng817 committed Jun 8, 2022
1 parent e9ebf8f commit a1a31d4
Show file tree
Hide file tree
Showing 15 changed files with 457 additions and 139 deletions.
13 changes: 3 additions & 10 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
from dagster.core.definitions.executor_definition import in_process_executor
from dagster.core.errors import DagsterUnmetExecutorRequirementsError
from dagster.core.execution.execute_in_process_result import ExecuteInProcessResult
from dagster.core.execution.with_resources import with_resources
from dagster.core.selector.subset_selector import AssetSelectionData
from dagster.core.storage.fs_io_manager import fs_io_manager
from dagster.utils import merge_dicts
from dagster.utils.backcompat import ExperimentalWarning

from ..definitions.asset_layer import build_asset_selection_job
Expand Down Expand Up @@ -106,14 +105,8 @@ def __init__(
)
executor_def = check.opt_inst_param(executor_def, "executor_def", ExecutorDefinition)

# In the case of collisions, merge_dicts takes values from the
# dictionary latest in the list, so we place the user provided resource
# defs after the defaults.
resource_defs = merge_dicts({"io_manager": fs_io_manager}, resource_defs)

_validate_resource_reqs_for_asset_group(
asset_list=assets, source_assets=source_assets, resource_defs=resource_defs
)
assets = with_resources(assets, resource_defs)
source_assets = with_resources(source_assets, resource_defs)

self._assets = assets
self._source_assets = source_assets
Expand Down
30 changes: 22 additions & 8 deletions python_modules/dagster/dagster/core/asset_defs/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefin
asset_deps=self._asset_deps,
can_subset=self.can_subset,
selected_asset_keys=selected_asset_keys & self.asset_keys,
resource_defs=self.resource_defs,
)

def to_source_assets(self) -> Sequence[SourceAsset]:
Expand All @@ -354,6 +355,7 @@ def to_source_assets(self) -> Sequence[SourceAsset]:
metadata=output_def.metadata,
io_manager_key=output_def.io_manager_key,
description=output_def.description,
resource_defs=self.resource_defs,
)
)

Expand All @@ -364,17 +366,28 @@ def get_resource_requirements(self) -> Iterator[ResourceRequirement]:
for source_key, resource_def in self.resource_defs.items():
yield from resource_def.get_resource_requirements(outer_context=source_key)

@property
def required_resource_keys(self) -> Set[str]:
return {requirement.key for requirement in self.get_resource_requirements()}

def with_resources(self, resource_defs: Mapping[str, ResourceDefinition]) -> "AssetsDefinition":
from dagster.core.execution.resources_init import get_transitive_required_resource_keys

merged_resource_defs = merge_dicts(resource_defs, self.resource_defs)
ensure_requirements_satisfied(
merged_resource_defs,
[
requirement
for requirement in self.get_resource_requirements()
if requirement.key != "io_manager"
],

# Ensure top-level resource requirements are met - except for
# io_manager, since that is a default it can be resolved later.
ensure_requirements_satisfied(merged_resource_defs, list(self.get_resource_requirements()))

# Get all transitive resource dependencies from other resources.
relevant_keys = get_transitive_required_resource_keys(
self.required_resource_keys, merged_resource_defs
)
relevant_resource_defs = {
key: resource_def
for key, resource_def in merged_resource_defs.items()
if key in relevant_keys
}

return AssetsDefinition(
asset_keys_by_input_name=self._asset_keys_by_input_name,
Expand All @@ -385,7 +398,8 @@ def with_resources(self, resource_defs: Mapping[str, ResourceDefinition]) -> "As
asset_deps=self._asset_deps,
selected_asset_keys=self._selected_asset_keys,
can_subset=self._can_subset,
resource_defs=merged_resource_defs,
resource_defs=relevant_resource_defs,
group_names=self._group_names,
)


Expand Down
57 changes: 30 additions & 27 deletions python_modules/dagster/dagster/core/asset_defs/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@
)
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.executor_definition import ExecutorDefinition
from dagster.core.definitions.graph_definition import GraphDefinition
from dagster.core.definitions.graph_definition import GraphDefinition, default_job_io_manager
from dagster.core.definitions.job_definition import JobDefinition
from dagster.core.definitions.output import OutputDefinition
from dagster.core.definitions.partition import PartitionedConfig, PartitionsDefinition
from dagster.core.definitions.partition_key_range import PartitionKeyRange
from dagster.core.definitions.resource_definition import ResourceDefinition
from dagster.core.errors import DagsterInvalidDefinitionError
from dagster.core.execution.with_resources import with_resources
from dagster.core.selector.subset_selector import AssetSelectionData
from dagster.utils import merge_dicts
from dagster.utils.backcompat import experimental

from .asset_partitions import get_upstream_partitions_for_partition_range
Expand Down Expand Up @@ -92,15 +94,20 @@ def asset2(asset1):

check.str_param(name, "name")
check.iterable_param(assets, "assets", of_type=AssetsDefinition)
check.opt_sequence_param(
source_assets = check.opt_sequence_param(
source_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition)
)
check.opt_str_param(description, "description")
check.opt_inst_param(_asset_selection_data, "_asset_selection_data", AssetSelectionData)
resource_defs = check.opt_mapping_param(resource_defs, "resource_defs")
resource_defs = merge_dicts({"io_manager": default_job_io_manager}, resource_defs)

assets = with_resources(assets, resource_defs)
source_assets = with_resources(source_assets, resource_defs)

source_assets_by_key = build_source_assets_by_key(source_assets)

partitioned_config = build_job_partitions_from_assets(assets, source_assets or [])
resource_defs = check.opt_mapping_param(resource_defs, "resource_defs")

deps, assets_defs_by_node_handle = build_deps(assets, source_assets_by_key.keys())
# attempt to resolve cycles using multi-asset subsetting
Expand Down Expand Up @@ -130,30 +137,7 @@ def asset2(asset1):
graph, assets_defs_by_node_handle, resolved_source_assets
)

all_resource_defs = dict(resource_defs)
for asset_def in assets:
for resource_key, resource_def in asset_def.resource_defs.items():
if (
resource_key in all_resource_defs
and all_resource_defs[resource_key] != resource_def
):
raise DagsterInvalidDefinitionError(
f"When attempting to build job, asset {asset_def.asset_key} had a conflicting version of the same resource key {resource_key}. Please resolve this conflict by giving different keys to each resource definition."
)
all_resource_defs[resource_key] = resource_def

required_io_manager_keys = set()
for source_asset in resolved_source_assets:
if not source_asset.io_manager_def:
required_io_manager_keys.add(source_asset.get_io_manager_key())
else:
all_resource_defs[source_asset.get_io_manager_key()] = source_asset.io_manager_def

for required_key in sorted(list(required_io_manager_keys)):
if required_key not in all_resource_defs and required_key != "io_manager":
raise DagsterInvalidDefinitionError(
f"Error when attempting to build job '{name}': IO Manager required for key '{required_key}', but none was provided."
)
all_resource_defs = get_all_resource_defs(assets, resolved_source_assets)

return graph.to_job(
resource_defs=all_resource_defs,
Expand Down Expand Up @@ -429,3 +413,22 @@ def _dfs(name, cur_color):
ret.append(assets_def.subset_for(asset_keys))

return ret


def get_all_resource_defs(
assets: Sequence[AssetsDefinition], source_assets: Sequence[SourceAsset]
) -> Dict[str, ResourceDefinition]:
all_resource_defs = {}
all_assets: Sequence[Union[AssetsDefinition, SourceAsset]] = [*assets, *source_assets]
for asset in all_assets:
for resource_key, resource_def in asset.resource_defs.items():
if resource_key not in all_resource_defs:
all_resource_defs[resource_key] = resource_def
if all_resource_defs[resource_key] != resource_def:
raise DagsterInvalidDefinitionError(
f"Conflicting versions of resource with key '{resource_key}' "
"were provided to different assets. When constructing a "
"job, all resource definitions provided to assets must "
"match by reference equality for a given key."
)
return all_resource_defs
62 changes: 48 additions & 14 deletions python_modules/dagster/dagster/core/asset_defs/source_asset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import NamedTuple, Optional, Sequence, Union
from typing import Dict, Mapping, NamedTuple, Optional, Sequence, Union, cast

import dagster._check as check
from dagster.core.definitions.events import AssetKey, CoerceableToAssetKey
Expand All @@ -10,10 +10,12 @@
normalize_metadata,
)
from dagster.core.definitions.partition import PartitionsDefinition
from dagster.core.definitions.resource_definition import ResourceDefinition
from dagster.core.definitions.resource_requirement import ResourceAddable
from dagster.core.definitions.utils import validate_group_name
from dagster.core.errors import DagsterInvalidDefinitionError
from dagster.core.storage.io_manager import IOManagerDefinition
from dagster.utils import merge_dicts


class SourceAsset(
Expand All @@ -23,10 +25,10 @@ class SourceAsset(
("key", AssetKey),
("metadata_entries", Sequence[Union[MetadataEntry, PartitionMetadataEntry]]),
("io_manager_key", Optional[str]),
("io_manager_def", Optional[IOManagerDefinition]),
("description", Optional[str]),
("partitions_def", Optional[PartitionsDefinition]),
("group_name", str),
("resource_defs", Dict[str, ResourceDefinition]),
],
),
ResourceAddable,
Expand Down Expand Up @@ -55,24 +57,37 @@ def __new__(
partitions_def: Optional[PartitionsDefinition] = None,
_metadata_entries: Optional[Sequence[Union[MetadataEntry, PartitionMetadataEntry]]] = None,
group_name: Optional[str] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
):

key = AssetKey.from_coerceable(key)
metadata = check.opt_dict_param(metadata, "metadata", key_type=str)
metadata_entries = _metadata_entries or normalize_metadata(metadata, [], allow_invalid=True)
resource_defs = dict(check.opt_mapping_param(resource_defs, "resource_defs"))
io_manager_def = check.opt_inst_param(io_manager_def, "io_manager_def", IOManagerDefinition)
if io_manager_def:
if not io_manager_key:
source_asset_path = "__".join(key.path)
io_manager_key = f"{source_asset_path}__io_manager"

if io_manager_key in resource_defs and resource_defs[io_manager_key] != io_manager_def:
raise DagsterInvalidDefinitionError(
f"Provided conflicting definitions for io manager key '{io_manager_key}'. Please provide only one definition per key."
)

resource_defs[io_manager_key] = io_manager_def

return super().__new__(
cls,
key=key,
metadata_entries=metadata_entries,
io_manager_key=check.opt_str_param(io_manager_key, "io_manager_key"),
io_manager_def=check.opt_inst_param(
io_manager_def, "io_manager_def", IOManagerDefinition
),
description=check.opt_str_param(description, "description"),
partitions_def=check.opt_inst_param(
partitions_def, "partitions_def", PartitionsDefinition
),
group_name=validate_group_name(group_name),
resource_defs=resource_defs,
)

@property
Expand All @@ -81,26 +96,45 @@ def metadata(self) -> MetadataMapping:
return {entry.label: entry.entry_data for entry in self.metadata_entries} # type: ignore

def get_io_manager_key(self) -> str:
if not self.io_manager_key and not self.io_manager_def:
return "io_manager"
source_asset_path = "__".join(self.key.path)
return self.io_manager_key or f"{source_asset_path}__io_manager"
return self.io_manager_key or "io_manager"

@property
def io_manager_def(self) -> Optional[IOManagerDefinition]:
io_manager_key = self.get_io_manager_key()
return cast(
Optional[IOManagerDefinition],
self.resource_defs.get(io_manager_key) if io_manager_key else None,
)

def with_resources(self, resource_defs) -> "SourceAsset":
if self.io_manager_def:
return self
from dagster.core.execution.resources_init import get_transitive_required_resource_keys

merged_resource_defs = merge_dicts(resource_defs, self.resource_defs)

io_manager_def = resource_defs.get(self.get_io_manager_key())
io_manager_def = merged_resource_defs.get(self.get_io_manager_key())
if not io_manager_def and self.get_io_manager_key() != "io_manager":
raise DagsterInvalidDefinitionError(
f"SourceAsset with asset key {self.key} requires IO manager with key '{self.get_io_manager_key()}', but none was provided."
)
relevant_keys = get_transitive_required_resource_keys(
{self.get_io_manager_key()}, merged_resource_defs
)

relevant_resource_defs = {
key: resource_def
for key, resource_def in merged_resource_defs.items()
if key in relevant_keys
}

io_manager_key = (
self.get_io_manager_key() if self.get_io_manager_key() != "io_manager" else None
)
return SourceAsset(
key=self.key,
io_manager_def=io_manager_def,
io_manager_key=self.get_io_manager_key(),
io_manager_key=io_manager_key,
description=self.description,
partitions_def=self.partitions_def,
_metadata_entries=self.metadata_entries,
resource_defs=relevant_resource_defs,
group_name=self.group_name,
)
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,9 @@ def retry_policy(self) -> Optional[RetryPolicy]:

def get_resource_requirements(
self,
asset_layer: "AssetLayer",
outer_container: "GraphDefinition",
parent_handle: Optional["NodeHandle"] = None,
asset_layer: Optional["AssetLayer"] = None,
) -> Iterator["ResourceRequirement"]:
from .resource_requirement import InputManagerRequirement

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,9 @@ def parent_graph_def(self) -> Optional["GraphDefinition"]:
def is_subselected(self) -> bool:
return False

def get_resource_requirements(self, asset_layer: "AssetLayer") -> Iterator[ResourceRequirement]:
def get_resource_requirements(
self, asset_layer: Optional["AssetLayer"] = None
) -> Iterator[ResourceRequirement]:
for node in self.node_dict.values():
yield from node.get_resource_requirements(outer_container=self, asset_layer=asset_layer)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def __init__(
self._graph_def.get_inputs_must_be_resolved_top_level(self._asset_layer)

def _get_resource_requirements_for_mode(self, mode_def: ModeDefinition) -> Set[str]:
from ..execution.resources_init import get_dependencies, resolve_resource_dependencies
from ..execution.resources_init import get_transitive_required_resource_keys

requirements = list(self._graph_def.get_resource_requirements(self.asset_layer))
for hook_def in self._hook_defs:
Expand All @@ -283,23 +283,10 @@ def _get_resource_requirements_for_mode(self, mode_def: ModeDefinition) -> Set[s
)
)
ensure_requirements_satisfied(mode_def.resource_defs, requirements, mode_def.name)
required_keys = sorted([requirement.key for requirement in requirements])
resource_dependencies = resolve_resource_dependencies(mode_def.resource_defs)
seen = set()
while required_keys:
required_key = required_keys.pop()
if required_key in seen:
continue
seen.add(required_key)
for requirement in mode_def.resource_defs[required_key].get_resource_requirements(
outer_context=required_key
):
ensure_requirements_satisfied(mode_def.resource_defs, [requirement], mode_def.name)
requirements.append(requirement)

required_keys += get_dependencies(required_key, resource_dependencies)

return set([requirement.key for requirement in requirements])
required_keys = {requirement.key for requirement in requirements}
return required_keys.union(
get_transitive_required_resource_keys(required_keys, mode_def.resource_defs)
)

@property
def name(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def get_resource_requirements(
outer_context: Optional[object] = None,
) -> Iterator[ResourceRequirement]:
# Outer requiree in this context is the outer-calling node handle. If not provided, then just use the solid name.
outer_context = cast(Optional[Tuple[NodeHandle, "AssetLayer"]], outer_context)
outer_context = cast(Optional[Tuple[NodeHandle, Optional["AssetLayer"]]], outer_context)
if not outer_context:
handle = None
asset_layer = None
Expand Down

0 comments on commit a1a31d4

Please sign in to comment.