Skip to content

Commit

Permalink
AssetsDefinition.asset_keys -> keys and similar (#8325)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Jun 13, 2022
1 parent 9d11ca4 commit 7bdf0ce
Show file tree
Hide file tree
Showing 18 changed files with 105 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@


def test_basic():
assert len(my_function.asset_keys) == 2
assert len(my_function.keys) == 2


def test_io_manager():
assert len(my_assets.asset_keys) == 2
assert len(my_assets.keys) == 2


def test_subset():
assert len(split_actions.asset_keys) == 2
assert len(split_actions.keys) == 2


def test_inter_asset_deps():
assert len(my_complex_assets.asset_keys) == 2
assert len(my_complex_assets.keys) == 2
12 changes: 6 additions & 6 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ def asset1():
...
result = AssetGroup([asset1]).prefixed("my_prefix")
assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"])
assert result.assets[0].key == AssetKey(["my_prefix", "asset1"])
Example with dependencies within the list of assets:
Expand All @@ -450,9 +450,9 @@ def asset2(asset1):
...
result = AssetGroup([asset1, asset2]).prefixed("my_prefix")
assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"])
assert result.assets[1].asset_key == AssetKey(["my_prefix", "asset2"])
assert result.assets[1].dependency_asset_keys == {AssetKey(["my_prefix", "asset1"])}
assert result.assets[0].key == AssetKey(["my_prefix", "asset1"])
assert result.assets[1].key == AssetKey(["my_prefix", "asset2"])
assert result.assets[1].dependency_keys == {AssetKey(["my_prefix", "asset1"])}
Examples with input prefixes provided by source assets:
Expand All @@ -466,8 +466,8 @@ def asset2(asset1):
result = AssetGroup([asset2], source_assets=[asset1]).prefixed("my_prefix")
assert len(result.assets) == 1
assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset2"])
assert result.assets[0].dependency_asset_keys == {AssetKey(["upstream_prefix", "asset1"])}
assert result.assets[0].key == AssetKey(["my_prefix", "asset2"])
assert result.assets[0].dependency_keys == {AssetKey(["upstream_prefix", "asset1"])}
assert result.source_assets[0].key == AssetKey(["upstream_prefix", "asset1"])
"""
prefixed_assets = prefix_assets(self.assets, key_prefix)
Expand Down
11 changes: 6 additions & 5 deletions python_modules/dagster/dagster/core/asset_defs/asset_in.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,34 @@ class AssetIn(
NamedTuple(
"_AssetIn",
[
("asset_key", Optional[AssetKey]),
("key", Optional[AssetKey]),
("metadata", Optional[Mapping[str, Any]]),
("key_prefix", Optional[Sequence[str]]),
],
)
):
def __new__(
cls,
asset_key: Optional[CoercibleToAssetKey] = None,
key: Optional[CoercibleToAssetKey] = None,
metadata: Optional[Mapping[str, Any]] = None,
namespace: Optional[Sequence[str]] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
asset_key: Optional[CoercibleToAssetKey] = None,
):
key_prefix = canonicalize_backcompat_args(
key_prefix, "key_prefix", namespace, "namespace", "0.16.0"
)
key = canonicalize_backcompat_args(key, "key", asset_key, "asset_key", "0.16.0")
if isinstance(key_prefix, str):
key_prefix = [key_prefix]

check.invariant(
not (asset_key and key_prefix),
("Asset key and key_prefix cannot both be set on AssetIn"),
not (key and key_prefix), "key and key_prefix cannot both be set on AssetIn"
)

return super(AssetIn, cls).__new__(
cls,
asset_key=AssetKey.from_coerceable(asset_key) if asset_key is not None else None,
key=AssetKey.from_coerceable(key) if key is not None else None,
metadata=check.opt_inst_param(metadata, "metadata", Mapping),
key_prefix=check.opt_list_param(key_prefix, "key_prefix", of_type=str),
)
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ def all() -> "AllAssetSelection":

@staticmethod
def assets(*assets_defs: AssetsDefinition) -> "KeysAssetSelection":
return KeysAssetSelection(
*(key for assets_def in assets_defs for key in assets_def.asset_keys)
)
return KeysAssetSelection(*(key for assets_def in assets_defs for key in assets_def.keys))

@staticmethod
def keys(*asset_keys: CoercibleToAssetKey) -> "KeysAssetSelection":
Expand Down
33 changes: 24 additions & 9 deletions python_modules/dagster/dagster/core/asset_defs/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from dagster.core.definitions.utils import DEFAULT_GROUP_NAME, validate_group_name
from dagster.core.execution.context.compute import OpExecutionContext
from dagster.utils import merge_dicts
from dagster.utils.backcompat import deprecation_warning

from ..definitions.resource_requirement import (
ResourceAddable,
Expand Down Expand Up @@ -207,27 +208,41 @@ def asset_deps(self) -> Mapping[AssetKey, AbstractSet[AssetKey]]:
return self._asset_deps

@property
def asset_key(self) -> AssetKey:
def key(self) -> AssetKey:
check.invariant(
len(self.asset_keys) == 1,
len(self.keys) == 1,
"Tried to retrieve asset key from an assets definition with multiple asset keys: "
+ ", ".join([str(ak.to_string()) for ak in self._asset_keys_by_output_name.values()]),
)

return next(iter(self.asset_keys))
return next(iter(self.keys))

@property
def asset_key(self) -> AssetKey:
deprecation_warning(
"AssetsDefinition.asset_key", "0.16.0", "Use AssetsDefinition.key instead."
)
return self.key

@property
def resource_defs(self) -> Dict[str, ResourceDefinition]:
return dict(self._resource_defs)

@property
def asset_keys(self) -> AbstractSet[AssetKey]:
def keys(self) -> AbstractSet[AssetKey]:
return self._selected_asset_keys

@property
def dependency_asset_keys(self) -> Iterable[AssetKey]:
def asset_keys(self) -> AbstractSet[AssetKey]:
deprecation_warning(
"AssetsDefinition.asset_keys", "0.16.0", "Use AssetsDefinition.keys instead."
)
return self.keys

@property
def dependency_keys(self) -> Iterable[AssetKey]:
# the input asset keys that are directly upstream of a selected asset key
upstream_keys = set().union(*(self.asset_deps[key] for key in self.asset_keys))
upstream_keys = set().union(*(self.asset_deps[key] for key in self.keys))
input_keys = set(self._asset_keys_by_input_name.values())
return upstream_keys.intersection(input_keys)

Expand All @@ -246,12 +261,12 @@ def asset_keys_by_output_name(self) -> Mapping[str, AssetKey]:
return {
name: key
for name, key in self.node_asset_keys_by_output_name.items()
if key in self.asset_keys
if key in self.keys
}

@property
def asset_keys_by_input_name(self) -> Mapping[str, AssetKey]:
upstream_keys = set().union(*(self.asset_deps[key] for key in self.asset_keys))
upstream_keys = set().union(*(self.asset_deps[key] for key in self.keys))
return {
name: key
for name, key in self.node_asset_keys_by_input_name.items()
Expand Down Expand Up @@ -367,7 +382,7 @@ def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefin
partition_mappings=self._partition_mappings,
asset_deps=self._asset_deps,
can_subset=self.can_subset,
selected_asset_keys=selected_asset_keys & self.asset_keys,
selected_asset_keys=selected_asset_keys & self.keys,
resource_defs=self.resource_defs,
)

Expand Down
12 changes: 6 additions & 6 deletions python_modules/dagster/dagster/core/asset_defs/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ def build_job_partitions_from_assets(
first_assets_with_partitions_def: AssetsDefinition = assets_with_partitions_defs[0]
for assets_def in assets_with_partitions_defs:
if assets_def.partitions_def != first_assets_with_partitions_def.partitions_def:
first_asset_key = next(iter(assets_def.asset_keys)).to_string()
second_asset_key = next(iter(first_assets_with_partitions_def.asset_keys)).to_string()
first_asset_key = next(iter(assets_def.keys)).to_string()
second_asset_key = next(iter(first_assets_with_partitions_def.keys)).to_string()
raise DagsterInvalidDefinitionError(
"When an assets job contains multiple partitions assets, they must have the "
f"same partitions definitions, but asset '{first_asset_key}' and asset "
Expand Down Expand Up @@ -194,7 +194,7 @@ def build_deps(
Mapping[NodeHandle, AssetsDefinition],
]:
# sort so that nodes get a consistent name
assets_defs = sorted(assets_defs, key=lambda ad: (sorted((ak for ak in ad.asset_keys))))
assets_defs = sorted(assets_defs, key=lambda ad: (sorted((ak for ak in ad.keys))))

# if the same graph/op is used in multiple assets_definitions, their invocations must have
# different names. we keep track of definitions that share a name and add a suffix to their
Expand Down Expand Up @@ -240,7 +240,7 @@ def build_deps(
if not input_def.dagster_type.is_nothing:
raise DagsterInvalidDefinitionError(
f"Input asset '{upstream_asset_key.to_string()}' for asset "
f"'{next(iter(assets_def.asset_keys)).to_string()}' is not "
f"'{next(iter(assets_def.keys)).to_string()}' is not "
"produced by any of the provided asset ops and is not one of the provided "
"sources"
)
Expand Down Expand Up @@ -292,7 +292,7 @@ def _attempt_resolve_cycles(
# index AssetsDefinitions by their asset names
assets_defs_by_asset_name = {}
for assets_def in assets_defs:
for asset_key in assets_def.asset_keys:
for asset_key in assets_def.keys:
assets_defs_by_asset_name[asset_key.to_user_string()] = assets_def

# color for each asset
Expand All @@ -302,7 +302,7 @@ def _attempt_resolve_cycles(
def _dfs(name, cur_color):
colors[name] = cur_color
if name in assets_defs_by_asset_name:
cur_node_asset_keys = assets_defs_by_asset_name[name].asset_keys
cur_node_asset_keys = assets_defs_by_asset_name[name].keys
else:
# in a SourceAsset, treat all downstream as if they're in the same node
cur_node_asset_keys = asset_deps["downstream"][name]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ def build_asset_ins(
asset_key = None

if input_name in asset_ins:
asset_key = asset_ins[input_name].asset_key
asset_key = asset_ins[input_name].key
metadata = asset_ins[input_name].metadata or {}
key_prefix = asset_ins[input_name].key_prefix
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def assets_and_source_assets_from_modules(
for asset in _find_assets_in_module(module):
if id(asset) not in asset_ids:
asset_ids.add(id(asset))
keys = asset.asset_keys if isinstance(asset, AssetsDefinition) else [asset.key]
keys = asset.keys if isinstance(asset, AssetsDefinition) else [asset.key]
for key in keys:
if key in asset_keys:
modules_str = ", ".join(set([asset_keys[key].__name__, module.__name__]))
Expand Down Expand Up @@ -100,7 +100,7 @@ def load_assets_from_modules(
if group_name:
assets = [
asset.with_prefix_or_group(
group_names={asset_key: group_name for asset_key in asset.asset_keys}
group_names={asset_key: group_name for asset_key in asset.keys}
)
for asset in assets
]
Expand Down Expand Up @@ -194,7 +194,7 @@ def load_assets_from_package_module(
if group_name:
assets = [
asset.with_prefix_or_group(
group_names={asset_key: group_name for asset_key in asset.asset_keys}
group_names={asset_key: group_name for asset_key in asset.keys}
)
for asset in assets
]
Expand Down Expand Up @@ -285,10 +285,10 @@ def asset2(asset1):
result = prefixed_asset_key_replacements([asset1, asset2], "my_prefix")
assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"])
assert result.assets[1].asset_key == AssetKey(["my_prefix", "asset2"])
assert result.assets[1].dependency_asset_keys == {AssetKey(["my_prefix", "asset1"])}
assert result.assets[1].dependency_keys == {AssetKey(["my_prefix", "asset1"])}
"""
asset_keys = {asset_key for assets_def in assets_defs for asset_key in assets_def.asset_keys}
asset_keys = {asset_key for assets_def in assets_defs for asset_key in assets_def.keys}

if isinstance(key_prefix, str):
key_prefix = [key_prefix]
Expand All @@ -297,10 +297,10 @@ def asset2(asset1):
result_assets: List[AssetsDefinition] = []
for assets_def in assets_defs:
output_asset_key_replacements = {
asset_key: AssetKey(key_prefix + asset_key.path) for asset_key in assets_def.asset_keys
asset_key: AssetKey(key_prefix + asset_key.path) for asset_key in assets_def.keys
}
input_asset_key_replacements = {}
for dep_asset_key in assets_def.dependency_asset_keys:
for dep_asset_key in assets_def.dependency_keys:
if dep_asset_key in asset_keys:
input_asset_key_replacements[dep_asset_key] = AssetKey(
key_prefix + dep_asset_key.path
Expand Down
14 changes: 7 additions & 7 deletions python_modules/dagster/dagster/core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def _asset_key_to_dep_node_handles(

# handle internal_asset_deps
for node_handle, assets_defs in assets_defs_by_node_handle.items():
all_output_asset_keys = assets_defs.asset_keys
all_output_asset_keys = assets_defs.keys
for asset_key, dep_asset_keys in assets_defs.asset_deps.items():
for dep_asset_key in [key for key in dep_asset_keys if key in all_output_asset_keys]:
output_node = dep_nodes_by_asset_key[asset_key][
Expand Down Expand Up @@ -441,7 +441,7 @@ def __init__(
self._assets_defs_by_key = {
key: assets_def
for assets_def in check.opt_list_param(assets_defs, "assets_defs")
for key in assets_def.asset_keys
for key in assets_def.keys
}

# keep an index from node handle to all keys expected to be generated in that node
Expand Down Expand Up @@ -521,7 +521,7 @@ def from_graph_and_assets_node_mapping(
asset_key,
partitions_fn=partition_fn if assets_def.partitions_def else None,
partitions_def=assets_def.partitions_def,
is_required=asset_key in assets_def.asset_keys,
is_required=asset_key in assets_def.keys,
)
io_manager_by_asset[asset_key] = inner_output_def.io_manager_key

Expand Down Expand Up @@ -700,10 +700,10 @@ def _subset_assets_defs(

for asset in set(assets):
# intersection
selected_subset = selected_asset_keys & asset.asset_keys
selected_subset = selected_asset_keys & asset.keys
included_keys.update(selected_subset)
# all assets in this def are selected
if selected_subset == asset.asset_keys:
if selected_subset == asset.keys:
included_assets.add(asset)
# no assets in this def are selected
elif len(selected_subset) == 0:
Expand All @@ -713,11 +713,11 @@ def _subset_assets_defs(
subset_asset = asset.subset_for(selected_asset_keys)
included_assets.add(subset_asset)
# subset of the asset that we don't want
excluded_assets.add(asset.subset_for(asset.asset_keys - subset_asset.asset_keys))
excluded_assets.add(asset.subset_for(asset.keys - subset_asset.keys))
else:
raise DagsterInvalidSubsetError(
f"When building job, the AssetsDefinition '{asset.node_def.name}' "
f"contains asset keys {sorted(list(asset.asset_keys))}, but "
f"contains asset keys {sorted(list(asset.keys))}, but "
f"attempted to select only {sorted(list(selected_subset))}. "
"This AssetsDefinition does not support subsetting. Please select all "
"asset keys produced by this asset."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def generate_asset_dep_graph(assets_defs: Iterable["AssetsDefinition"]) -> Depen
upstream: Dict[str, Set[str]] = {}
downstream: Dict[str, Set[str]] = {}
for assets_def in assets_defs:
for asset_key in assets_def.asset_keys:
for asset_key in assets_def.keys:
asset_name = asset_key.to_user_string()
upstream[asset_name] = set()
downstream[asset_name] = downstream.get(asset_name, set())
Expand Down Expand Up @@ -232,7 +232,7 @@ def generate_asset_name_to_definition_map(
) -> Mapping[str, "AssetsDefinition"]:
asset_name_map = {}
for assets_def in assets_defs:
for asset_key in assets_def.asset_keys:
for asset_key in assets_def.keys:
asset_name = asset_key.to_user_string()
asset_name_map[asset_name] = assets_def
return asset_name_map
Expand All @@ -247,7 +247,7 @@ def fetch_connected_assets_definitions(
depth: Optional[int] = MAX_NUM,
) -> FrozenSet["AssetsDefinition"]:
depth = MAX_NUM if depth is None else depth
names = [asset_key.to_user_string() for asset_key in asset.asset_keys]
names = [asset_key.to_user_string() for asset_key in asset.keys]
connected_names = [
n for name in names for n in fetch_connected(name, graph, direction=direction, depth=depth)
]
Expand Down Expand Up @@ -490,7 +490,7 @@ def parse_asset_selection(

# special case: select *
if len(asset_selection) == 1 and asset_selection[0] == "*":
return frozenset(set().union(*(ad.asset_keys for ad in assets_defs)))
return frozenset(set().union(*(ad.keys for ad in assets_defs)))

graph = generate_asset_dep_graph(assets_defs)
assets_set: Set[str] = set()
Expand Down

0 comments on commit 7bdf0ce

Please sign in to comment.